「AWS Black Belt Tech Webinar 2015 – AWS Data Pipeline」レポート
こんにちは、虎塚です。
9月9日(水)の夕方に実施された「AWS Black Belt Tech Webinar 2015 – AWS Data Pipeline」を聴講したので、レポートします。講師は、アマゾン データ サービス ジャパンの今井さんでした。
Data Pipelineは、ジョブの定期実行のために利用できるAWSのサービスです。昨年のBlack Belt Tech Webinarからアップデートされていましたので、ふたたびレポートします。
1. Data Pipelineとはなにか
一言でいうと、サービス間のデータ統合・処理をスケジュールベースで自動化するサービス。
AWS Data Pipelineでできること
Data Pipelineでは、データの分析、集計、レポート作成などに必要な次のような処理を実行できる。
- サーバやDBからデータを回収
- ETL(前処理)
- Redshiftへのデータロード
AWSのビッグデータプラットフォーム
AWSのビッグデータ関連サービスには、次のようなものがある。これらの間をつなぐのがData Pipelineである。
- Data Collection and Storage
- S3, Kinesis, DynaDB
- Event Processing
- AWS Lambda, KCL Apps (Kinesis Client Library), EMR
- Data Processing
- EMR
- Data Analysis
- Redshift, Machine Learning
データの前処理は面倒くさい
たとえば、データを集めてDBにロードするバッチ処理を記述すると、データ回収や集計などのロジックだけでなく、スケジュール管理や失敗時のリトライのための状態管理などの例外処理も必要になるため、コードが長大になりがちだ。
データの前処理では、次のような部分が面倒くさい。
- 異なるストレージ、異なるフォーマットからのデータのマイグレーション
- 処理どうしの依存関係の管理
- 異常処理としてのリトライ、タイムアウト、イベント通知
ユースケース例
ここで、「定期的に起動中のEC2リストを取得して、S3に保存する」処理をスクリプトで実行する場合を考えてみる。通常は次のようにおこなうが、いくつかの課題がある。
- スクリプトを書く
- EC2にスクリプトを配置してcronで実行する
- EC2が落ちたらどうする?
- cronログはどこで管理する?
- EC2の時計がずれたらどうする?
こういった課題に、Data Pipelineでは対応できる。
Data Pipelineでの処理
上記の処理を例に、Data Pipelineでおこなう設定(処理の定義)の流れを見てみる。
- Pipelineを作成する
+ Pipelineの名前を決める + あらかじめ定義されたテンプレートを使う (Build using a template) / 自分で作成したJSONをインポートする (Import a definition) / 画面で設定する (Build using Architect, 今回はこれ) + 処理実行の間隔(インターバル)を設定する + Pipeline実行ログの出力先のS3バケットを指定する(任意) + Data Pipelineが利用するIAM Roleを指定する (Pipeline Role) + Data Pipelineが起動するEC2などが利用するIAM Roleを指定する (EC2 Instance Role) 2. ブラウザに表示されるキャンバス(Architect画面)の中で設定をおこなう + 歯車アイコンはアクティビティ。どんな処理を、どんなインターバルで、どのリソース上で動かすか定義する + 茶筒(笑)アイコンはデータノード。S3, Redshift, RDSを表わす + リソースは、実際に処理を実行するEC2やEMR定義の集合。アクティビティにattachする + スケジュールは、処理のインターバル定義の集合。アクティビティにattachする + アクションは、タスク成功/失敗時の通知などの定義の集合。アクティビティにattachする
上記で、アクティビティ、データノード、リソース、スケジュール、アクションという言葉が出てきた。Data Pipeline処理は、これらを使って定義する。
それぞれの言葉が指す概念は、あとで詳しく解説することにして、まずは具体的な設定内容を紹介する。
データノード
- Type
- S3, Redshift, RDSなどから選ぶ
- Directory Path
- S3バケットのパスを指定
アクティビティ
アクティビティとして、次の内容を定義する。
- Type
- ShellCommandActivity(指定したシェルスクリプトを実行)を選択する
- 選択可能な他のアクティビティは後述
- Schedule
- 処理インターバルを指定する。たとえば15分ごとなど
- 後述のスケジュール定義をこのプロパティから参照する
- Runs On
- アクティビティを実行するEC2を指定する
- 後述のリソース定義をこのプロパティから参照する
- Command
- シェルスクリプトを直書きする
- または、Script Uriプロパティで、S3に配置したスクリプトファイルを指定する
- Stage
- Trueを指定すると、${INPUT1_STAGING_DIR}, ${OUTPUT1_STAGING_DIR}という変数を利用できる(後述のステージングについてを参照)
- Output
- 結果を出力するS3バケットを指定する
- On Success / On Fail
- 成功時/失敗時のアクションを指定する
サポートされるアクティビティには、上記で指定したShellCommandActivtyのほかに、CopyActivity, EmrActivity, HiveActivity, HiveCopyActivity, PigActivity, RedshiftCopyActivity, SqlActivityがある。
スケジュール
スケジュールは、処理を実行するタイミングのこと。
- Type
- Scheduleを指定する
- Period
- インターバル(処理の実行間隔)を15分〜3年の間で設定する。最小は15分
- Start At
- スケジュールの開始日時を指定する
リソース
- Type
- Ec2Resource
- SubnetId
- リソースを起動するVPC Subnetを指定
- Schedule
- リソースを起動するインターバル
- Terminate After
- 起動したリソースの削除時間の設定
- Image Id/Instance Type
- 起動するリソースのAMIやインスタンスタイプ
リソース側にもスケジュールの設定があることに注意。たとえば、Scheduleを1時間に設定して、Terminate Afterを15分に設定すると、指定したリソースが1時間ごとに起動して、その都度15分たつと削除される。
アクション
- Type
- SnsAlarm, Terminateから選択
- Role
- アクションを実行するIAM Role
- Subject
- メッセージのタイトル
- Message
- メッセージ本文
- Topic Arn
- SNSのTopic ARN
ここまでのまとめ
以上を設定してActivateすると、定義にしたがってジョブが実行される。
- リソースでの定義にしたがって、EC2を15分ごとに起動する
- アクティビティで定義したコマンドがEC2上で実行される
- 今回の場合、起動中のEC2リストを取得してS3に保存
- 変数${OUTPUT1_STAGING_DIR}を定義しておくと、そこに配置されたファイルがS3へアップロードされる。S3バケットは、データノード定義で指定
- Terminate Afterで定義した時間がくると削除される
Execution Details画面で、リソースの起動ログやジョブの実行ログを確認できる。時計は、Data Pipeline(AWSサービス側)のものが使われる。
Pipelineのテンプレートには、たとえば次のようなものがある。
- Hadoopジョブのキック
- DynamoDBへのデータインポート
- RDSへのデータインポート
Data Pipelineを使いはじめるには
上記で具体的な例を見たが、改めてData Pipelineの仕組みを体系的に見てみる。Data Pipelineでは、次の要素を組み合わせてワークフローを定義する。
- データ: データソースや出力先の定義
- アクティビティ: 処理の内容を定義
- スケジュール: 処理の依存関係とスケジュールを定義
- 通知: イベントの通知先を定義
オンプレミス環境と連携することもできる。
ワークフロー定義の登場人物
実際にワークフロー定義を記述する言葉で、上記の要素を言い換えると、次のようになる。
- Activity: データ処理のアクティビティ
- Data Node: データの場所、フォーマット
- Schedule: 処理実行のスケジュール
- Resource: 処理や条件チェックをするリソース
- Precondition: 処理実行の条件
- 例: S3にこのファイルがあったら実行するとか、ジョブの待ち合わせとか
- Action: 通知を送る方法
以降で、それぞれの登場人物の詳細を改めて確認する。
アクティビティ (Activities)
アクティビティは、データ移動や処理の全体を管理する。入出力、スケジュール、処理内容、リソース、通知アクションを定義できる。
定義した処理は、AWSとオンプレミスのどちらでも実行できる。
サポートされるアクティビティ一覧は、次のページを参照。CopyActivityや、後で例を示すShellCommandActivityなどがある。
データノード (Data Nodes)
Input/Outputデータの場所やタイプを定義する。
- S3バケットのバス
- DynamoDB
- Redshift
- その他、AWSサービスに限らず、SQLを解釈可能な各種データベース
フォーマットは、CSVをはじめカスタマイズした形式が自由に利用できる。
スケジュール (Schedule)
処理を実行するタイミングを定義する。いちばん短いインターバルは15分で、それより短くすることはできない。
- cronスタイル: 指定した間隔のstart時点で起動
- Time Seriesスタイル: 指定した間隔のend時点で起動
- ただし、EC2、EMRのリソースは常にstart時点に作成
- 間隔: 15分、時、日、週など
- 15分〜3年で指定できる
- Backfillタスク
- 開始時間に過去を指定すると、現在までさかのぼってタスクを繰り返し実行する(テストに便利)
- 開始時間が1日以前の場合、タスクを起動しない
- CLIから--force引数を指定すると起動できる
- タイムゾーン
- 初期値はUTCで、"YYYY-MM-DDTHH:MM:SS"フォーマット
- 変数にタイムゾーンを指定できる
- #{inTimeZone(myDateTime,'Asia/Tokyo')}
リソース (Resource)
タスクを実行するリソースを定義する。
- EC2
- EC2-Classic, EC2-VPC両方サポート
- EMR
- タスクノードにspot instanceを利用できる
- Multi-regionのリソース管理ができる
なお、リソースの実体はTask Runnerと呼ばれるAWS Data Pipelineのエージェントプロセス (Java)である。これが、DataPipelineのコントロールブレーンとやりとりする。
Task Runnerは、ただのJavaプロセスなので、既存のEC2(Data Pipelineが起動するリソースではないEC2)やオンプレミスのサーバでも動かすことができる。
$ java -jar TaskRunner-1.0.jar --config ~/credentials.json --workerGroup=WorkerGroup1 --region=MyRegion --logUri=s3://mybucket/foldername
リソースは、スケジュール定義にしたがって起動し、終了する。スケジュールのカスケードは自動的に行われるが、アクティビティとリソースのスケジュールを別々に指定することもできる。
たとえば、20分に1度EC2を起動すると、その都度1時間分の利用費がかかってしまう。そこで、リソースのスケジュールは1時間に1回、アクティビティのスケジュールを20分と定義することで、コストを節約してタスクを実行できる。
依存関係 (Preconditions)
条件が成立した場合のみ後続タスクを実行するように定義できる。次のような定義ができる。
- DynamoDBのテーブルが存在するとき/データがあるとき
- S3キーが存在するとき
- S3プリフィックスが存在するとき
- 独自のシェルこアンド実行が成功したとき
- 依存するPipelineタスクが成功したとき
その他のtips
データとテーブルのステージング
タスク実行時にデータへアクセスしやすくするための仕組みとして、ステージングと呼ばれる機能がある。
処理を実行するEC2やEMRの中に入力と出力のディレクトリを定義して、そこからデータを自動的にリソースへコピーしたり、テーブルを自動的に生成したりできる。これによって、処理対象のデータをリソースローカルに置くことができる。
たとえば、ShellCommandActivityでは、ステージングの設定値をtrueにすることで、変数${INPUTx_STAGING_DIR}と${OUTPUTx_STAGING_DIR}を使えるようになる。
イベントと異常管理
イベントが発生するタイミングとして、次の3つの状態がある。
- 成功時
- 失敗時
- 遅れが発生したとき
設定可能なアクションは、SNS通知とリソース削除の2種類。失敗時の自動リトライは、デフォルト3回で、初期実行を含め1〜6回の実行回数を設定できる。
ファイル出力
AWS Management ConsoleのGUIを操作して定義したワークフローの要素を、JSONに出力できる。Pipelineのバージョン管理に便利。
出力したファイルは、もちろんインポートして使うことができる。
料金
アクティビティ、または依存関係の従量課金(アクティビティ/依存関係の月額単価)になる。
AWS上の場合、1日に1回以上起動されるアクティビティについては$1.00、それより少なければ$0.60。 オンプレで動かす場合の料金や、実行しないPipelineの料金も定められている。
AWS Data Pipelineがもたらすもの
- 分離
- データと処理リソース
- 処理リソースと処理ロジック
- 処理ロジックとスケジュール
- 統合
- 分散環境での整合性
- 一貫したエラー処理や処理のリトライ
たとえば、S3に格納したデータを取り出して処理して、またS3に格納するような場合に、間の処理にData Pipelineを使うと便利になる。
2. Data Pipelineを使うべきではないところ
イベントドリブンな処理にはAWS Lambdaを使うことが望ましい。
「1時間に1回処理をしたい」という時間ベースの処理にはData Pipelineだが、「あるファイルがアップロードされたら後続の処理をしたい」ような場合は、AWS Lambdaを使う。
Pipelineのglue(糊)としてのAWS Lambda
データが格納されたことをトリガーに、AWS Lambdaを起動する。AWS Lambdaの事例として、Expediaがある。
S3にアップロードされたデータをトリガーに、AWS LambdaがEMRを起動してデータを処理した後、S3にデータを格納する。一連のログはDynamoDBに格納する。
AWS Lambdaでの処理の待ち合わせ
処理の待ち合わせには、DynamoDBを使用する。ファイルのカタログにDynamoDBを使用し、LambdaがDynamoDBを参照して処理に必要なファイルが揃っているかをチェックする。揃っていたら後続の処理を実行する。
3. Data Pipelineがフィットするケーススタディ
Data Pipelineには、1日に1回、週に1回、月に1回などの頻度で定期的におこなわれる処理がフィットする。たとえば、データのインポート、クロール、アップロード、バックアップと呼ばれる処理など。
これらにData Pipelineを適用すると、AWS的にコスト効率が高くなる。また、実行環境を定期的に手動で起動する必要がなくなる。
たとえば、RDSでない非マネージドなデータベースのバックアップも、Data Pipelineを使うと自動化できる。
定義済みのアクティビティからユースケースを考える
Data Pipelineにあらかじめ用意されたアクティビティを元に、どのようなユースケースがフィットするかを考えてみる。
- CopyActivity
- MysqlDataNodeからS3DataNodeへ、もしくはそれらどうしの間で、データをコピーする
- ユースケース: RDBの定期的なバックアップ
- SqlActivity
- データベースにログインしてSQLを発行する
- ユースケース: 定期的にローデータテーブルを集計して、結果をサマリテーブルに入れる
- EMRActivity
- EMRでJARを指定して処理を実行する。
- ユースケース: 指定するJAR次第でなんでもできる。ShellCommandActivity並みに自由度が高い。大量データに対するETLや集計、分析など
- HiveActivity
- EMR上でHiveを実行する
- ユースケース: 生データが格納されたS3バケットからデータを取り出して整形、フィルタして別のバケットに格納
- HiveCopyActivity
- S3とDynamoDBのデータ連携をおこなう
- PigActivity
- EMR上でPigを実行する
- RedshiftCopyActivity
- S3からRedshiftにデータを取り込む
- ShellCommandActivity
- シェルスクリプトを実行
- ユースケース: 前述の「定期的に起動中のEC2リストを取得して、S3に保存する」例のように、シェルスクリプトでできることならなんでもできる
re:Invent 2014では、Data Pipelineの事例セッションがあり、30〜40分かけてData Pipelineの多様な使い道が紹介された。(おそらく次のセッションのこと)
まとめ
AWS Data Pipelineを使うことで、バッチ処理を管理しやすくする。バッチ処理のスクリプトは、モノリシックな秘伝のタレになりがちだが、適切に分割してそれぞれ管理できるようになる。
よくある処理については、Data Pipelineが用意する様々なプリセットのアクティビティを使えるので、スクリプトを書く手間が省ける。また、処理の実行環境を自前で管理しなくてよくなり、Data Pipeineに丸ごとまかせることができる。
Q&A
- Q. EC2やEMRを使うほどでない小さな処理がしたいので、アクティビティにAWS Lambdaを使いたいです。
- A. 機能要望として承ります。EC2もEMRも時間単位の課金なので、100msec単位で課金できるといいですよね。
- Q. Terminate Afrerについて: S3にデータを保存する処理をする場合、確実にデータ格納された後、Terminate Afterの処理が行われますか?
- A. Noです。純粋に時間ベースで動くので、利用者側で余裕をみて安全率を設定してください。そもそも、EC2もEMRも起動時点で1時間分はかならず利用費がかかるので、1時間ないし単位時間を目一杯使えるようにTerminate Afterを指定するのがおすすめです。
- Q. (最初のデモのシナリオについて)スケジュールで指定した時間にリソースが起動するの? それともコマンドが実行されるの?
- A. 「15分に1回」と指定したリソースとアクティビティがある場合は、先にリソースが起動されます。アクティビティを実行しようとしたときに、Data Pipelineはリソースを探しにいきます。waiting task runnerという状態になり、リソースが起動するのを待ってから、アクティビティを実行します
- Q. オンプレサーバと連携する場合、ネットワークの経路はどうなりますか?
- A. オンプレ-AWSの間の接続方法とルーティングの設定に依存します。たとえば、インターネットでしか繋がっていなければ、インターネット経由になります。Direct Connectが繋がっている場合は、Public接続できる設定をすれば、Direct Connect経由になります。VPNでどうなるかは、確認して回答します
- Q. AMIはカスタマイズできますか
- A. Yes. cloud-initが入っているサーバであれば利用できます。Task Runnerがcloud-initで都度インストールされるためです
- Q. SQLを扱えるDBをデータノードとして使えるということは、Data Nodeのポートに繋いでSQLを発行しますか? DBMS側でポートへの接続を許可する必要がありますか? DBをグローバルに公開せずにできますか?
- A. すべてYesです。SQL Clientからコマンド発行するのは、リソース上になるので、実行するリソースが所属しているセキュリティグループやサブネットで設定をしてください。AWS内であれば、制御可能です。オンプレでどうなるかは確認して回答します。
おわりに
具体例をもとにした解説だったおかげで、Data Pipelineの定義に必要な概念がよく理解できるウェビナーでした。
リソースにAWS Lambdaが使えるようになると、さらにコストメリットが期待できそうなので、楽しみにしています!
それでは、また。